Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
installs
node_modules/
.travis.bak

t.bat
# Added for pedestal framework test
.lein-deps-sum

Expand Down
4 changes: 2 additions & 2 deletions frameworks/Rust/water-http/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[package]
name = "water-http"
version = "0.1.0"
edition = "2018"
edition = "2024"

[dependencies]
askama = "0.14.0"
tokio = { version = "1.47.1", features = ["full"] }
water_http = { version = "3.2.1-beta.14" ,optional = true , features = ["use_only_http1"]}
water_http = { features = ["use_io_uring","use_only_http1"],optional = true , version = "3.4.2-beta.4" }
smallvec = "1.15.1"
nanorand = "0.8.0"
tokio-postgres = "0.7.15"
Expand Down
66 changes: 2 additions & 64 deletions frameworks/Rust/water-http/benchmark_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,13 @@
"framework": "water-http",
"tests": [
{
"default": {
"default": {
"json_url": "/json",
"plaintext_url": "/plaintext",
"fortune_url": "/fortunes",
"db_url": "/db",
"query_url": "/queries?q=",
"update_url": "/updates?q=",
"port": 8080,
"approach": "Realistic",
"classification": "Fullstack",
"database": "Postgres",
"framework": "water_http",
"language": "Rust",
"orm": "raw",
"platform": "Rust",
"webserver": "water_http",
"os": "Linux",
"database_os": "Linux",
"display_name": "water_http"
},

"db": {
"fortune_url": "/fortunes",
"db_url": "/db",
"query_url": "/queries?q=",
"update_url": "/updates?q=",
"port": 8080,
"approach": "Realistic",
"classification": "Fullstack",
"database": "Postgres",
"framework": "water_http",
"language": "Rust",
"orm": "raw",
"platform": "Rust",
"webserver": "water_http",
"os": "Linux",
"database_os": "Linux",
"display_name": "water_http"
},

"cached": {
"cached_query_url": "/cached-queries?q=",
"port": 8080,
"approach": "Realistic",
Expand All @@ -57,35 +23,7 @@
"os": "Linux",
"database_os": "Linux",
"display_name": "water_http"
},
"json": {
"json_url": "/json",
"port": 8080,
"approach": "Realistic",
"classification": "Fullstack",
"framework": "water_http",
"language": "Rust",
"orm": "raw",
"platform": "Rust",
"webserver": "water_http",
"os": "Linux",
"database_os": "Linux",
"display_name": "water_http"
},
"plaintext": {
"plaintext_url": "/plaintext",
"port": 8080,
"approach": "Realistic",
"classification": "Fullstack",
"framework": "water_http",
"language": "Rust",
"orm": "raw",
"platform": "Rust",
"webserver": "water_http",
"os": "Linux",
"database_os": "Linux",
"display_name": "water_http"
}
}
]
]
}
11 changes: 7 additions & 4 deletions frameworks/Rust/water-http/src/cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ pub struct ThreadSharedStruct{
impl ThreadSharedStruct {

#[inline(always)]
pub fn get_value(id:i32)->&'static i32{
pub fn get_value(id:i32)->Option<&'static i32>{
let map = unsafe {CACHED_VALUES.as_ref().unwrap().get(&id)} ;
map.unwrap()
map
}
pub fn get_cached_queries(&self,num:usize)->&[u8]{
let buf = unsafe{&mut *(self.writing_buffer.get())};
Expand All @@ -181,8 +181,11 @@ impl ThreadSharedStruct {
let mut writer = BytesMuteWriter(buf);
let mut rn = self.rng.clone();
for _ in 0..num {
let rd: i32 = (rn.generate::<u32>() & 0x3FFF) as i32 % 10_000 + 1;
let v = Self::get_value(rd);
let rd = (rn.generate::<u32>() % 10_000 ) as i32;
let v = match Self::get_value(rd) {
None => {continue}
Some(c) => {c}
};
writer.extend_from_slice(br"{");
_ = write!(writer, r#""id":{},"randomnumber":{}"#, rd, v);
writer.extend_from_slice(br"},");
Expand Down
72 changes: 56 additions & 16 deletions frameworks/Rust/water-http/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
#![cfg(any(feature = "db",feature = "all"))]
use std::{borrow::Cow, io};
use std::{borrow::Cow, io, ptr};
use std::fmt::Arguments;
use std::io::Write;
use std::mem::MaybeUninit;
use std::rc::Rc;
use std::cell::UnsafeCell;
use std::collections::HashMap;
use bytes::Buf;
use nanorand::{Rng, WyRand};
use tokio_postgres::{connect, Client, Statement, NoTls};
use tokio_postgres::{connect, Client, Statement, NoTls, Error};
use tokio_postgres::types::private::BytesMut;
use crate::models::{Fortune, FortuneTemplate, World};
use sonic_rs::prelude::WriteExt;
use yarte::TemplateBytesTrait;
pub static mut CACHED_VALUES:Option<HashMap<i32,i32>> = None;

/// Database connection pool with thread-local RNG
pub struct DbConnectionPool {
Expand Down Expand Up @@ -130,7 +132,7 @@ impl PgConnection {
} /// Connect to the database

#[inline(always)]
pub fn generate_update_values_stmt(batch_size: usize) -> String {
pub fn generate_update_values_stmt(batch_size: usize) -> String {

let mut sql = String::from("UPDATE world SET randomNumber = w.r FROM (VALUES ");

Expand All @@ -156,38 +158,35 @@ impl PgConnection {
/// Get a single random world - optimized with buffer reuse
#[inline]
pub async fn get_world(&self) -> &[u8] {
let rd = (self.rang.clone().generate::<u32>() % 10_000 + 1) as i32;
let rd = (self.rang.clone().generate::<u32>() % 10_000 ) as i32;
let row = self.cl.query_one(&self.world, &[&rd]).await.unwrap();

let buffers = self.buffers();
buffers.body.clear();

sonic_rs::to_writer(
BytesMuteWriter(&mut buffers.body),
&World {
id: row.get(0),
randomnumber: row.get(1),
},
).unwrap();

buffers.body.chunk()
}

/// Get multiple random worlds - optimized with buffer reuse
pub async fn get_worlds(&self, num: usize) -> &[u8] {
let buffers = self.buffers();
buffers.worlds.clear();
let mut worlds = Vec::with_capacity(num);
let mut rn = self.rang.clone();
for _ in 0..num {
let id: i32 = (rn.generate::<u32>() & 0x3FFF) as i32 % 10_000 + 1;
let id = (self.rang.clone().generate::<u32>() % 10_000 ) as i32;
let row = self.cl.query_one(&self.world, &[&id]).await.unwrap();
buffers.worlds.push(World {
worlds.push(World {
id: row.get(0),
randomnumber: row.get(1),
});
}
buffers.body.clear();
sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &buffers.worlds).unwrap();
sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &worlds).unwrap();
buffers.body.chunk()
}
/// Update worlds in batch - optimized with buffer reuse
Expand Down Expand Up @@ -215,23 +214,24 @@ impl PgConnection {
futures.extend(ids.iter().map(|x| async move {self.cl.query_one(&self.world,&[&x]).await}));
futures_util::future::join_all(futures).await;
ids.sort_unstable();
buffers.worlds.clear();
let mut worlds = Vec::with_capacity(num);
let mut numbers = Vec::with_capacity(num);
for index in 0..num {
let s_id = (rng.generate::<u32>() % 10_000 + 1 ) as i32;
buffers.worlds.push(World{
worlds.push(World{
id:ids[index],
randomnumber:s_id
});
buffers.numbers.push(s_id);
numbers.push(s_id);
}
buffers.body.clear();
for index in 0..num {
params.push(&ids[index]);
params.push(&buffers.numbers[index]);
params.push(&numbers[index]);
}

_=self.cl.execute(&self.updates[num - 1], &params).await.unwrap();
sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &buffers.worlds).unwrap();
sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &worlds).unwrap();
buffers.body.chunk()
}

Expand Down Expand Up @@ -264,11 +264,51 @@ impl PgConnection {
// Return reference to buffer - zero-copy!
Ok(&buffers.fortune_output)
}


pub fn get_cached_queries(&self,num:usize)->&[u8]{
let buf = self.buffers();
let buf = &mut buf.body;
buf.clear();
buf.extend_from_slice(br#"["#);
let mut writer = BytesMuteWriter(buf);
let mut rn = self.rang.clone();
for _ in 0..num {
let rd = (rn.generate::<u32>() % 10_000 ) as i32;
let v = match self.get_world_id_for_cache(rd){
None => {continue}
Some(e)=>{e}
};
writer.extend_from_slice(br"{");
_ = write!(writer, r#""id":{},"randomnumber":{}"#, rd, v);
writer.extend_from_slice(br"},");
}
if buf.len() >1 {buf.truncate(buf.len() - 1);}
buf.extend_from_slice(b"]");
return &buf[..]
}

fn get_world_id_for_cache(&self, id: i32) -> Option<i32> {
unsafe {
let ptr = ptr::addr_of!(CACHED_VALUES);

match &*ptr {
Some(map) => map.get(&id).copied(),
None => None,
}
}
}
}

/// Zero-copy writer for BytesMut
pub struct BytesMuteWriter<'a>(pub &'a mut BytesMut);
impl BytesMuteWriter<'_> {

#[inline(always)]
pub fn extend_from_slice(&mut self,data:&[u8]){
self.0.extend_from_slice(data);
}
}
impl Write for BytesMuteWriter<'_> {
#[inline(always)]
fn write(&mut self, src: &[u8]) -> Result<usize, io::Error> {
Expand Down
Loading
Loading