Skip to content

feat: Various KVBM improvements #1604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: ziqif/locality-py-bindings
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

use super::*;

use llm_rs::block_manager::distributed::KvbmLeader as KvbmLeaderImpl;

use llm_rs::block_manager::distributed::{KvbmLeader as KvbmLeaderImpl, KvbmLeaderConfig};

#[pyclass]
pub struct KvbmLeader {
Expand All @@ -14,10 +13,16 @@ pub struct KvbmLeader {
#[pymethods]
impl KvbmLeader {
#[new]
#[pyo3(signature = (barrier_id, world_size=1))]
fn new(barrier_id: String, world_size: usize) -> PyResult<Self> {
#[pyo3(signature = (barrier_id, bytes_per_block, world_size))]
fn new(barrier_id: String, bytes_per_block: usize, world_size: usize) -> PyResult<Self> {
let config = KvbmLeaderConfig::builder()
.barrier_id(barrier_id)
.bytes_per_block(bytes_per_block)
.world_size(world_size)
.build()
.map_err(to_pyerr)?;

let leader = KvbmLeaderImpl::new(barrier_id, world_size).map_err(to_pyerr)?;
let leader = KvbmLeaderImpl::new(config).map_err(to_pyerr)?;

Ok(Self {
_impl: Arc::new(leader),
Expand Down
29 changes: 14 additions & 15 deletions lib/bindings/python/rust/llm/block_manager/distributed/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use super::*;

use llm_rs::block_manager::distributed::KvbmWorker as KvbmWorkerImpl;
use llm_rs::block_manager::distributed::{KvbmWorker as KvbmWorkerImpl, KvbmWorkerConfig};
use llm_rs::block_manager::storage::torch::{TorchDevice, TorchTensor};

/// A wrapper around a Torch tensor.
Expand Down Expand Up @@ -79,11 +79,9 @@ pub struct KvbmWorker {
#[pymethods]
impl KvbmWorker {
#[new]
#[pyo3(signature = (num_device_blocks, num_host_blocks, num_disk_blocks, page_size, tensors, device_id=0, worker_id=0, dtype=None, barrier_id="kvbm".to_string()))]
#[pyo3(signature = (num_device_blocks, page_size, tensors, device_id=0, worker_id=0, dtype=None, barrier_id="kvbm".to_string()))]
fn new(
num_device_blocks: usize,
num_host_blocks: usize,
num_disk_blocks: usize,
page_size: usize,
tensors: Vec<Py<PyAny>>,
device_id: usize,
Expand All @@ -100,17 +98,18 @@ impl KvbmWorker {
vllm_tensors.push(Box::new(vllm_tensor));
}

let worker = KvbmWorkerImpl::new(
num_device_blocks,
num_host_blocks,
num_disk_blocks,
page_size,
vllm_tensors,
device_id,
worker_id,
dtype,
barrier_id,
).map_err(to_pyerr)?;
let config = KvbmWorkerConfig::builder()
.num_device_blocks(num_device_blocks)
.page_size(page_size)
.tensors(vllm_tensors)
.device_id(device_id)
.worker_id(worker_id)
.dtype(dtype)
.barrier_id(barrier_id)
.build()
.map_err(to_pyerr)?;

let worker = KvbmWorkerImpl::new(config).map_err(to_pyerr)?;

Ok(Self {
_impl: Arc::new(worker),
Expand Down
1 change: 1 addition & 0 deletions lib/llm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ default = ["block-manager", "testing-full"]
testing-full = ["testing-cuda", "testing-nixl"]
testing-cuda = ["dep:cudarc"]
testing-nixl = ["dep:nixl-sys"]
testing-etcd = []
block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:ndarray", "dep:nix"]
sentencepiece = ["dep:sentencepiece"]

Expand Down
91 changes: 89 additions & 2 deletions lib/llm/src/block_manager/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,92 @@ mod zmq;
mod leader;
mod worker;

pub use leader::KvbmLeader;
pub use worker::KvbmWorker;
pub use leader::{KvbmLeader, KvbmLeaderConfig};
pub use worker::{KvbmWorker, KvbmWorkerConfig};

#[cfg(all(test, feature = "testing-cuda", feature = "testing-etcd"))]
mod tests {
use crate::block_manager::storage::{
torch::{TorchDevice, TorchTensor},
DeviceAllocator, Storage, StorageAllocator,
};

use anyhow::Result;

use dynamo_runtime::logging::init as init_logging;

use super::*;

#[derive(Clone, Debug)]
struct MockTensor {
ptr: u64,
}

impl MockTensor {
fn new() -> Self {
let allocator = DeviceAllocator::new(0).unwrap();

let device_storage = std::mem::ManuallyDrop::new(allocator.allocate(4096).unwrap());

let ptr = device_storage.addr();
Self { ptr }
}
}

impl TorchTensor for MockTensor {
fn device(&self) -> TorchDevice {
TorchDevice::Cuda(0)
}

fn data_ptr(&self) -> u64 {
self.ptr
}

fn size_bytes(&self) -> usize {
1024 * 1024 * 1024
}

fn shape(&self) -> Vec<usize> {
vec![2, 8, 32]
}

fn stride(&self) -> Vec<usize> {
vec![512, 64, 1]
}
}

#[test]
fn test_leader_worker_sync() -> Result<()> {
init_logging();

const NUM_WORKERS: usize = 4;

let mut workers = Vec::new();

// We're actually able to test this all in a single thread.
// Worker startup is async. It returns immediately, and spins up a worker which waits on etcd + zmq init.
// On the other hand, the leader startup is fully synchronous. It will only return once it's established a zmq connection with all workers.
for i in 0..NUM_WORKERS {
let tensors: Vec<Box<dyn TorchTensor>> = vec![Box::new(MockTensor::new())];

let config = KvbmWorkerConfig::builder()
.num_device_blocks(8)
.tensors(tensors)
.worker_id(i)
.build()?;

let worker = KvbmWorker::new(config)?;
workers.push(worker);
}

let leader_config = KvbmLeaderConfig::builder()
.world_size(NUM_WORKERS)
.bytes_per_block(1)
.build()?;

// When/if this returns, we know that all the workers were also successful.
let _ = KvbmLeader::new(leader_config)?;

Ok(())
}
}
53 changes: 46 additions & 7 deletions lib/llm/src/block_manager/distributed/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use zmq::*;

use dynamo_runtime::{utils::leader_worker_barrier::LeaderBarrier, DistributedRuntime};

use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -19,6 +20,36 @@ pub struct KvbmLeaderData {
pub zmq_url: String,
pub broadcast_port: usize,
pub ack_port: usize,
pub num_host_blocks: usize,
pub num_disk_blocks: usize,
}

fn compute_num_blocks(env_var: &str, bytes_per_block: usize) -> usize {
let cache_size_gb = std::env::var(env_var)
.unwrap_or_default()
.parse::<usize>()
.unwrap_or(0);
(cache_size_gb * 1_000_000_000) / bytes_per_block
}

#[derive(Builder, Clone, Debug)]
pub struct KvbmLeaderConfig {
/// Amount of bytes within a full kv cache block (summed across all ranks).
bytes_per_block: usize,

/// The barrier id to use for syncing with workers.
#[builder(default = "String::from(\"kvbm\")")]
barrier_id: String,

/// The world size.
#[builder(default = "1")]
world_size: usize,
}

impl KvbmLeaderConfig {
pub fn builder() -> KvbmLeaderConfigBuilder {
KvbmLeaderConfigBuilder::default()
}
}

/// The leader of the KVBM.
Expand All @@ -35,25 +66,33 @@ pub struct KvbmLeader {
}

impl KvbmLeader {
pub fn new(barrier_id: String, world_size: usize) -> anyhow::Result<Self> {
pub fn new(config: KvbmLeaderConfig) -> anyhow::Result<Self> {
let (drt, runtime) = build_drt()?;

tracing::info!(
"Syncing leader barrier with {} workers on barrier id {}",
world_size,
barrier_id
config.world_size,
config.barrier_id
);

let num_host_blocks = compute_num_blocks("DYNAMO_KVBM_CPU_CACHE", config.bytes_per_block);
let num_disk_blocks = compute_num_blocks("DYNAMO_KVBM_DISK_CACHE", config.bytes_per_block);

// TODO: For now, just hardcode localhost.
let zmq_data = Arc::new(KvbmLeaderData {
zmq_url: "127.0.0.1".to_string(),
broadcast_port: 5555,
ack_port: 5556,
num_host_blocks,
num_disk_blocks,
});

// Build our leader barrier and publish the data.
let leader_barrier: LeaderBarrier<KvbmLeaderData, ()> =
LeaderBarrier::new(barrier_id, world_size, Some(Duration::from_secs(30)));
let leader_barrier: LeaderBarrier<KvbmLeaderData, ()> = LeaderBarrier::new(
config.barrier_id,
config.world_size,
Some(Duration::from_secs(30)),
);

let drt_clone = drt.clone();
let zmq_data_clone = zmq_data.clone();
Expand All @@ -68,7 +107,7 @@ impl KvbmLeader {
})
.map_err(|e| anyhow::anyhow!("Failed to sync leader barrier: {:?}", e))?;

tracing::info!("Leader barrier synced with {} workers", world_size);
tracing::info!("Leader barrier synced with {} workers", config.world_size);

// Now, create our active message leader.
// This also blocks until a ZMQ connection has been established.
Expand All @@ -78,7 +117,7 @@ impl KvbmLeader {
&zmq_data.zmq_url,
zmq_data.broadcast_port,
zmq_data.ack_port,
world_size,
config.world_size,
Duration::from_secs(30),
cancel_token.clone(),
)
Expand Down
Loading
Loading