Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,20 @@ For two-micro-batch overlapping, you can refer to the following figure. With our

#### Easier potential overall design

The current DeepEP implementation uses queues for communication buffers which save memory but introduce complexity and potential deadlocks. If you're implementing your own version based on DeepEP, consider using fixed-size buffers allocated to maximum capacity for simplicity and better performance. For a detailed discussion of this alternative approach, see https://github.com/deepseek-ai/DeepEP/issues/39.
The current DeepEP implementation uses queues for communication buffers which save memory but introduce complexity and potential deadlocks. As suggested in [issue #39](https://github.com/deepseek-ai/DeepEP/issues/39), we are working on an alternative approach that uses fixed-size buffers allocated to maximum capacity. This simplifies the design and eliminates potential deadlocks from queue-based communication.

This approach:
1. Allocates buffers directly based on the maximum possible number of tokens
2. Allows direct address calculation when sending, eliminating the need for a dynamic queue
3. Implements a dynamic buffer resizing strategy that expands the buffer when any rank's buffer size is insufficient and shrinks the buffer when it hasn't been fully utilized for an extended period

You can enable this experimental feature by setting `dynamic_buffer_resize=True` when creating a [Buffer](file:///mnt/workspace/DeepEP/deep_ep/buffer.py#L14-L634) instance:

```python
buffer = Buffer(group, num_nvl_bytes, num_rdma_bytes, dynamic_buffer_resize=True)
```

While this approach may use more GPU memory (the exact amount depends on the specific scenario), the implementation is much simpler. You can more easily add new features, and the performance ceiling might be slightly better.

#### Undefined-behavior PTX usage

Expand Down
23 changes: 15 additions & 8 deletions csrc/deep_ep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,8 @@

namespace deep_ep {

Buffer::Buffer(int rank, int num_ranks, int64_t num_nvl_bytes, int64_t num_rdma_bytes, bool low_latency_mode, bool explicitly_destroy):
rank(rank), num_ranks(num_ranks),
num_nvl_bytes(num_nvl_bytes), num_rdma_bytes(num_rdma_bytes),
low_latency_mode(low_latency_mode),
explicitly_destroy(explicitly_destroy),
comm_stream(at::cuda::getStreamFromPool(true)) {
Buffer::Buffer(int rank, int num_ranks, int64_t num_nvl_bytes, int64_t num_rdma_bytes, bool low_latency_mode, bool explicitly_destroy)
: num_nvl_bytes(num_nvl_bytes), num_rdma_bytes(num_rdma_bytes), low_latency_mode(low_latency_mode), explicitly_destroy(explicitly_destroy) {
// Metadata memory
int64_t barrier_signal_bytes = NUM_MAX_NVL_PEERS * sizeof(int);
int64_t buffer_ptr_bytes = NUM_MAX_NVL_PEERS * sizeof(void*);
Expand Down Expand Up @@ -59,6 +55,8 @@ Buffer::Buffer(int rank, int num_ranks, int64_t num_nvl_bytes, int64_t num_rdma_
}

// Create 32 MiB workspace
}

CUDA_CHECK(cudaMalloc(&workspace, NUM_WORKSPACE_BYTES));
CUDA_CHECK(cudaMemsetAsync(workspace, 0, NUM_WORKSPACE_BYTES, comm_stream));

Expand Down Expand Up @@ -240,8 +238,17 @@ void Buffer::sync(const std::vector<int> &device_ids,
}

std::tuple<torch::Tensor, std::optional<torch::Tensor>, torch::Tensor, torch::Tensor, std::optional<EventHandle>>
Buffer::get_dispatch_layout(const torch::Tensor& topk_idx, int num_experts,
std::optional<EventHandle>& previous_event, bool async, bool allocate_on_comm_stream) {
Buffer::get_dispatch_layout(const torch::Tensor& topk_idx,
int num_experts, std::optional<EventHandle>& previous_event,
bool async, bool allocate_on_comm_stream) {
// Track maximum tokens observed for dynamic buffer resizing
// This is part of the "Easier Potential Overall Design" implementation
// Instead of using queues, we track the maximum tokens to inform buffer allocation
int num_tokens = static_cast<int>(topk_idx.size(0));
if (dynamic_buffer_resize && num_tokens > max_tokens_observed) {
max_tokens_observed = num_tokens;
}

EP_HOST_ASSERT(topk_idx.dim() == 2);
EP_HOST_ASSERT(topk_idx.is_contiguous());
EP_HOST_ASSERT(num_experts > 0);
Expand Down
18 changes: 17 additions & 1 deletion csrc/deep_ep.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ struct Buffer {
// Host-side RDMA-level MoE info
volatile int* moe_recv_rdma_counter = nullptr;
int* moe_recv_rdma_counter_mapped = nullptr;

// Dynamic buffer resizing support
// This implements the "Easier Potential Overall Design" from issue #39
// Instead of using queues for memory efficiency, we allocate buffers based on maximum token counts
// This simplifies the design and eliminates potential deadlocks from queue-based communication
bool dynamic_buffer_resize = false;
int max_tokens_observed = 0;
int buffer_expansion_count = 0;
int buffer_contraction_count = 0;

public:
Buffer(int rank, int num_ranks, int64_t num_nvl_bytes, int64_t num_rdma_bytes, bool low_latency_mode, bool explicitly_destroy);
Expand Down Expand Up @@ -105,6 +114,13 @@ struct Buffer {

void destroy();

// Functions for dynamic buffer resizing
// Implements the approach suggested in issue #39 for simpler design
void enable_dynamic_buffer_resize(bool enable);
bool is_dynamic_buffer_resize_enabled() const;
int get_max_tokens_observed() const;
void reset_max_tokens_observed();

std::tuple<torch::Tensor, std::optional<torch::Tensor>, torch::Tensor, torch::Tensor, std::optional<EventHandle>>
get_dispatch_layout(const torch::Tensor& topk_idx, int num_experts, std::optional<EventHandle>& previous_event,
bool async, bool allocate_on_comm_stream);
Expand Down Expand Up @@ -161,4 +177,4 @@ struct Buffer {
get_next_low_latency_combine_buffer(int num_max_dispatch_tokens_per_rank, int hidden, int num_experts) const;
};

} // namespace deep_ep
} // namespace deep_ep
7 changes: 6 additions & 1 deletion deep_ep/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def __init__(self, group: dist.ProcessGroup,
low_latency_mode: bool = False, num_qps_per_rank: int = 24,
allow_nvlink_for_low_latency_mode: bool = True,
allow_mnnvl: bool = False,
explicitly_destroy: bool = False) -> None:
explicitly_destroy: bool = False,
dynamic_buffer_resize: bool = False) -> None:
"""
Initialize the communication buffer.

Expand All @@ -53,6 +54,9 @@ def __init__(self, group: dist.ProcessGroup,
explicitly_destroy: If this flag is set to True, you need to explicitly call `destroy()` to release resources;
otherwise, the resources will be released by the destructor.
Note: Releasing resources in the destructor may cause Python's exception handling process to hang.
dynamic_buffer_resize: Enable dynamic buffer resizing based on actual usage. This simplifies
the design by allocating buffers based on maximum possible tokens rather than using queues.
Refer to https://github.com/deepseek-ai/DeepEP/issues/39 for more details.
"""
check_nvlink_connections(group)

Expand All @@ -64,6 +68,7 @@ def __init__(self, group: dist.ProcessGroup,
self.num_rdma_bytes = num_rdma_bytes
self.low_latency_mode = low_latency_mode
self.explicitly_destroy = explicitly_destroy
self.dynamic_buffer_resize = dynamic_buffer_resize
self.runtime = deep_ep_cpp.Buffer(self.rank, self.group_size, num_nvl_bytes, num_rdma_bytes, low_latency_mode, explicitly_destroy)

# Synchronize device IDs
Expand Down