Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
175c6c9
running original deepep
MaoZiming Sep 24, 2025
200cc88
fix
Sep 24, 2025
1cfa40b
conflict
Sep 24, 2025
524a47b
import detect_ib_hca
Sep 24, 2025
997617a
fix
MaoZiming Sep 24, 2025
e6de9b1
Merge branch 'run-original-deepEP' of https://github.com/uccl-project…
MaoZiming Sep 24, 2025
85eae41
ignore draft
MaoZiming Sep 24, 2025
ebc49b0
build on host
MaoZiming Sep 24, 2025
bef4130
ep normal mode
MaoZiming Sep 27, 2025
9d12719
Merge branch 'main' of https://github.com/uccl-project/uccl into ep-n…
MaoZiming Sep 27, 2025
f4cbe97
adding barrier and quiet
MaoZiming Sep 27, 2025
eeb4261
limit nvshmemi_ibgda_quiet and nvshmem_sync_with_same_gpu_idx to thre…
MaoZiming Sep 27, 2025
4e2ff95
adding toy barrier quiet and wait_until_cmd_consumed
MaoZiming Sep 27, 2025
a405f91
wip remote write OOB
MaoZiming Sep 28, 2025
f4189b4
adding hierarchical barrier
MaoZiming Sep 29, 2025
3a3df32
adding translate_dst_rdma_rank
MaoZiming Sep 29, 2025
e5c2acb
fixing low latency buffer idx and rank problem
MaoZiming Sep 29, 2025
1a8de50
debug
MaoZiming Sep 29, 2025
4a1bf15
fix atomic is_combine issue
MaoZiming Oct 1, 2025
cbeeb19
debugging DeepEP dispatch NVL receiver timeout
MaoZiming Oct 1, 2025
0062334
remove extra code
MaoZiming Oct 1, 2025
4198d6b
remove redundant print
MaoZiming Oct 1, 2025
ebf6c5c
git revert relaxed
MaoZiming Oct 1, 2025
22962b1
embed atomics together with write
MaoZiming Oct 2, 2025
56a2eb2
debugging
MaoZiming Oct 3, 2025
74a3690
format
MaoZiming Oct 3, 2025
cb4630f
check
MaoZiming Oct 3, 2025
7bf996f
fix dispatch error
MaoZiming Oct 4, 2025
e8649c2
debugging combine
MaoZiming Oct 4, 2025
087ccfc
seems roughly working
MaoZiming Oct 4, 2025
d832edf
clean and fix
MaoZiming Oct 5, 2025
a46adb4
cleaning
MaoZiming Oct 5, 2025
2dac98e
clean and run
MaoZiming Oct 5, 2025
61ee19e
clean
MaoZiming Oct 5, 2025
133c5b4
revert deepEP tests
MaoZiming Oct 5, 2025
026a7e6
Merge branch 'main' of https://github.com/uccl-project/uccl into ep-n…
MaoZiming Oct 5, 2025
fc8d487
it works!
MaoZiming Oct 5, 2025
ef4dccb
clean
MaoZiming Oct 5, 2025
f0066c0
lower kMaxInflight
MaoZiming Oct 5, 2025
d4b8e3b
code gardening
MaoZiming Oct 5, 2025
02ed37e
clean internode
MaoZiming Oct 5, 2025
2510a44
clean atomic offset bound
MaoZiming Oct 5, 2025
ea7eb5c
clean
MaoZiming Oct 5, 2025
be5b030
code gardening
MaoZiming Oct 5, 2025
a7c2b8f
format
MaoZiming Oct 5, 2025
77b51ea
make low latency mode work too
MaoZiming Oct 5, 2025
ea37719
add compile flag
MaoZiming Oct 5, 2025
391b125
build
MaoZiming Oct 5, 2025
e512d20
trying to make it work on gh200
MaoZiming Oct 5, 2025
34cd57f
revert
MaoZiming Oct 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions ep/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ else
EFA_LDFLAGS := -L$(EFA_HOME)/lib -lefa
endif

ifeq ($(MAKE_NORMAL_MODE),1)
NORMAL_CFLAGS := -DUSE_NORMAL_MODE
$(info Building with normal mode)
else
NORMAL_CFLAGS :=
$(info Building with low latency mode)
endif

ARCH := $(shell uname -m)
GPU_NAME := $(shell nvidia-smi --query-gpu=name --format=csv,noheader | head -n1)
CPU_IS_ARM64 := 0
Expand Down Expand Up @@ -51,12 +59,12 @@ SM ?= 90
CXXFLAGS := -O3 -std=c++17 -Wall -pthread -fPIC -fvisibility=hidden
LDFLAGS := -lpthread -lglog -libverbs -lnl-3 -lnl-route-3 -Xlinker -rpath -Xlinker $(CUDA_PATH)/lib64
NVCCFLAGS := -O3 -std=c++17 -Xcompiler "-Wall -pthread -fPIC -fvisibility=hidden" -ccbin /usr/bin/g++ --expt-relaxed-constexpr
INCLUDES := -Iinclude -I$(CUDA_PATH)/include -I/usr/include -I../include -I../thirdparty/DeepEP/csrc
INCLUDES := -Iinclude -I$(CUDA_PATH)/include -I/usr/include -I../include

CXXFLAGS += $(EFA_CFLAGS) $(GH_CFLAGS)
NVCCFLAGS += $(EFA_CFLAGS) $(GH_CFLAGS)
CXXFLAGS += $(EFA_CFLAGS) $(GH_CFLAGS) $(NORMAL_CFLAGS)
NVCCFLAGS += $(EFA_CFLAGS) $(GH_CFLAGS) $(NORMAL_CFLAGS)
LDFLAGS += $(EFA_LDFLAGS)
INCLUDES += $(EFA_CFLAGS) $(GH_CFLAGS)
INCLUDES += $(EFA_CFLAGS) $(GH_CFLAGS) $(NORMAL_CFLAGS)

SRC_CPP := src/proxy.cpp src/rdma.cpp src/common.cpp src/peer_copy_worker.cpp src/uccl_proxy.cpp src/uccl_bench.cpp src/peer_copy_manager.cpp
SRC_CU := src/gpu_kernel.cu src/peer_copy.cu src/py_cuda_shims.cu src/internode_ll.cu src/internode.cu src/layout.cu src/intranode.cu src/ep_runtime.cu
Expand Down
64 changes: 37 additions & 27 deletions ep/bench/test_internode.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
"""
This is the same test_internode.py test in DeepEP's repo.

Build:
export OMP_NUM_THREADS=6
export MAKE_NORMAL_MODE=1
make clean && make -j install

On first node:
export OMP_NUM_THREADS=4
torchrun --nnodes=2 --nproc_per_node=8 --node_rank=0 \
--master_addr=10.1.227.34 --master_port=12355 \
bench/test_internode.py --num-tokens=4096 \
--hidden=7168 --num-topk=8 --num-experts=256 --test-ll-compatibility

On second node:
export OMP_NUM_THREADS=4
torchrun --nnodes=2 --nproc_per_node=8 --node_rank=1 \
--master_addr=10.1.227.34 --master_port=12355 \
bench/test_internode.py --num-tokens=4096 \
Expand Down Expand Up @@ -44,12 +47,20 @@
initialize_uccl,
destroy_uccl,
init_dist_under_torchrun,
detect_ib_hca,
)

# Test compatibility with low latency functions
import test_low_latency
from buffer import Buffer
from uccl.ep import Config

try:
from uccl.ep import Config
except ImportError as exc:
import sys

sys.stderr.write("Failed to import uccl.ep\n")
raise


# noinspection PyShadowingNames
Expand Down Expand Up @@ -184,7 +195,6 @@ def check_data(check_x, recv_gbl_rank_prefix_sum):
print(
f'[testing] Running with {"FP8" if isinstance(current_x, tuple) else "BF16"}, {"with" if with_topk else "without"} top-k (async={async_mode}, previous={previous_mode}) ...',
flush=True,
end="",
)
dispatch_args = {
"x": current_x,
Expand Down Expand Up @@ -323,9 +333,6 @@ def check_data(check_x, recv_gbl_rank_prefix_sum):
dispatch_bf16_nvl_recv_bytes = recv_x.numel() * 2
combine_bf16_nvl_send_bytes = dispatch_bf16_nvl_recv_bytes
combine_bf16_rdma_recv_bytes = dispatch_bf16_rdma_send_bytes

if local_rank == 0:
print(" passed", flush=True)
if local_rank == 0:
print("", flush=True)

Expand Down Expand Up @@ -446,8 +453,9 @@ def check_data(check_x, recv_gbl_rank_prefix_sum):


# noinspection PyUnboundLocalVariable,PyShadowingNames
def test_loop(local_rank: int, num_local_ranks: int, args: argparse.Namespace):
num_nodes = int(os.getenv("WORLD_SIZE", 1))
def test_loop(
local_rank: int, num_local_ranks: int, num_nodes: int, args: argparse.Namespace
):
rank, num_ranks, group = init_dist_under_torchrun(local_rank, num_local_ranks)
if args.test_ll_compatibility:
ll_num_tokens, ll_hidden, ll_num_experts, ll_num_topk = 16, 5120, 256, 9
Expand All @@ -464,7 +472,9 @@ def test_loop(local_rank: int, num_local_ranks: int, args: argparse.Namespace):
scratch = torch.zeros(
num_rdma_bytes, dtype=torch.uint8, device=f"cuda:{device_index}"
)
proxies, workers = initialize_uccl(scratch, num_rdma_bytes, rank, num_ranks, group)
proxies, workers = initialize_uccl(
scratch, num_rdma_bytes, rank, num_ranks, group, num_experts=args.num_experts
)

buffer = Buffer(
group,
Expand Down Expand Up @@ -501,21 +511,6 @@ def test_loop(local_rank: int, num_local_ranks: int, args: argparse.Namespace):
if local_rank == 0:
print("", flush=True)

# Test compatibility with low latency functions
if args.test_ll_compatibility:
buffer.clean_low_latency_buffer(ll_num_tokens, ll_hidden, ll_num_experts)
test_low_latency.test_main(
ll_num_tokens,
ll_hidden,
ll_num_experts,
ll_num_topk,
rank,
num_ranks,
group,
buffer,
seed=1,
)

# Destroy the buffer runtime and communication group
group.barrier()
buffer.destroy()
Expand All @@ -526,6 +521,17 @@ def test_loop(local_rank: int, num_local_ranks: int, args: argparse.Namespace):


if __name__ == "__main__":
if os.getenv("MAKE_NORMAL_MODE") != "1":
raise RuntimeError(
"[ERROR] The environment variable MAKE_NORMAL_MODE is not set to 1 (normal mode disabled).\n"
"This script requires normal mode to be active.\n"
"To fix this, run the following before rebuilding:\n"
"export MAKE_NORMAL_MODE=1 && make clean && make -j install\n"
)
ib_dev = detect_ib_hca()
if ib_dev and ib_dev.startswith("mlx"): # Mellanox IB devices show up like mlx5_0
os.environ["NCCL_IB_HCA"] = ib_dev
print(f"Set NCCL_IB_HCA={ib_dev}")
parser = argparse.ArgumentParser(description="Test internode EP kernels")
parser.add_argument(
"--num-processes",
Expand Down Expand Up @@ -557,14 +563,18 @@ def test_loop(local_rank: int, num_local_ranks: int, args: argparse.Namespace):
help="whether to test compatibility with low-latency kernels",
)
args = parser.parse_args()
world_size = int(os.environ["WORLD_SIZE"])
local_world_size = int(os.environ["LOCAL_WORLD_SIZE"])
num_nodes = world_size // local_world_size

# Set default `num_topk_groups` if not provided
if args.num_topk_groups is None:
num_nodes = int(os.getenv("WORLD_SIZE", 1))
args.num_topk_groups = min(num_nodes, 4)

num_processes = args.num_processes
if num_processes != 8:
raise ValueError("Only --num-processes=8 is supported for this test.")
# NOTE: modified from deep_ep
local_rank = int(os.environ["LOCAL_RANK"])
num_local_ranks = int(os.environ.get("LOCAL_WORLD_SIZE", 1))
test_loop(local_rank, num_local_ranks, args)
test_loop(local_rank, num_local_ranks, num_nodes, args)
10 changes: 7 additions & 3 deletions ep/bench/test_low_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,13 @@ def test_loop(local_rank: int, num_local_ranks: int, args: argparse.Namespace):


if __name__ == "__main__":
# TODO: you may modify NUMA binding for less CPU overhead
# TODO: buggy with `num_tokens=512`

if os.getenv("MAKE_NORMAL_MODE") == "1":
raise RuntimeError(
"[ERROR] The environment variable MAKE_NORMAL_MODE=1 indicates normal mode is active.\n"
"This test requires low-latency mode.\n"
"To fix this, run the following before rebuilding:\n"
"unset MAKE_NORMAL_MODE && make clean && make -j install\n"
)
ib_dev = detect_ib_hca()
if ib_dev and ib_dev.startswith("mlx"): # Mellanox IB devices show up like mlx5_0
os.environ["NCCL_IB_HCA"] = ib_dev
Expand Down
1 change: 1 addition & 0 deletions ep/bench/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ def initialize_uccl(
peer_ip="" if is_intranode else peer_ip,
num_experts=num_experts,
num_ranks=num_ranks,
num_nodes=int(os.environ.get("WORLD_SIZE")) // nproc_per_node,
)
if not is_intranode:
proxy.set_peers_meta(peers_meta_list)
Expand Down
15 changes: 15 additions & 0 deletions ep/include/barrier_local.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once
#include <atomic>
#include <cstdint>

#ifndef UCCL_MAX_LOCAL_RANKS
#define UCCL_MAX_LOCAL_RANKS 8
#endif

struct LocalBarrier {
std::atomic<uint32_t> arrive_seq[UCCL_MAX_LOCAL_RANKS];
std::atomic<uint16_t> release_seq[UCCL_MAX_LOCAL_RANKS];
std::atomic<uint16_t> seq;
std::atomic<uint64_t> full_mask; // unchanged; still used for size/info
std::atomic<uint64_t> arrived_mask; // optional: keep only for debug prints
};
23 changes: 20 additions & 3 deletions ep/include/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,22 @@
#define MEASURE_PER_OP_LATENCY
#define MEASURE_PER_VERB_LATENCY
// #define USE_SENDER_BARRIER

// #define USE_NORMAL_MODE
#ifndef USE_NORMAL_MODE
#ifndef USE_SENDER_BARRIER
#ifdef EFA
#define USE_RECEIVER_BARRIER
#endif
#endif
#define kAtomicBufferSize 8196
#else
// #define USE_SENDER_BARRIER
#endif

#define kAtomicBufferSize 81960
#define kQueueSize 1024
#define kQueueMask (kQueueSize - 1)
#define kMaxInflight 256
#define kMaxInflight 128
#define kBatchSize 32
#define kIterations 40000
#define kNumThBlocks 4
Expand All @@ -33,14 +40,24 @@
#define kMaxOutstandingRecvs 2048 * 2
#define kSenderAckQueueDepth 2048 * 2
#define kWarmupOps 10000
#define kRingsPerProxy 8
#define kRemoteBufferSize (kBatchSize * kNumThBlocks * kObjectSize * 100)
#define MAIN_THREAD_CPU_IDX 31
#define MAX_NUM_GPUS 8
#define RECEIVER_BATCH_SIZE 16
#define NVLINK_SM_PER_PROCESS 1
#define kAtomicWrTag 0xa70a000000000000ULL
#define kAtomicMask 0x0000FFFFFFFFFFFFULL
#define kPrintCycleInterval 1000000000ULL
#define kBarrierWrTag 0xbaba000000000000ULL
#define kBarrierMask 0x0000FFFFFFFFFFFFULL
#define kPrintCycleInterval 5000000000ULL
// Base TCP port for Proxy barrier rendezvous (rank0 server)
#define TCP_PORT 18515
#define MAX_RETRIES 100
#define RETRY_DELAY_MS 50
#define QKEY 0x11111111u
#define kLargeAtomicValue 33554352
#define kMaxSendAtomicValue 16383
// P2P enable flags (once per GPU pair)
extern std::once_flag peer_ok_flag[MAX_NUM_GPUS][MAX_NUM_GPUS];
bool pin_thread_to_cpu(int cpu);
Expand Down
1 change: 1 addition & 0 deletions ep/include/ep_configs.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#define FINISHED_SUM_TAG 1024
#define NUM_WAIT_NANOSECONDS 500

#define ENABLE_FAST_DEBUG
#ifndef ENABLE_FAST_DEBUG
#define NUM_CPU_TIMEOUT_SECS 100
#define NUM_TIMEOUT_CYCLES 200000000000ull // 200G cycles ~= 100s
Expand Down
7 changes: 5 additions & 2 deletions ep/include/internode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ void notify_dispatch(
int num_max_rdma_chunked_recv_tokens, void** buffer_ptrs,
int num_max_nvl_chunked_recv_tokens, int** barrier_signal_ptrs, int rank,
cudaStream_t stream, int64_t num_rdma_bytes, int64_t num_nvl_bytes,
bool low_latency_mode, uint64_t const* ring_addrs, int num_ring_addrs);
bool low_latency_mode, uint64_t const* ring_addrs, int num_ring_addrs,
void* atomic_buffer_ptr);

void cached_notify(int hidden_int4, int num_scales, int num_topk_idx,
int num_topk_weights, int num_ranks, int num_channels,
Expand All @@ -49,7 +50,9 @@ void cached_notify(int hidden_int4, int num_scales, int num_topk_idx,
void** buffer_ptrs, int num_max_nvl_chunked_recv_tokens,
int** barrier_signal_ptrs, int rank, cudaStream_t stream,
int64_t num_rdma_bytes, int64_t num_nvl_bytes,
bool is_cached_dispatch, bool low_latency_mode);
bool is_cached_dispatch, bool low_latency_mode,
uint64_t const* ring_addrs, int num_ring_addrs,
void* atomic_buffer_ptr);

void dispatch(void* recv_x, float* recv_x_scales, int64_t* recv_topk_idx,
float* recv_topk_weights, void* recv_src_meta, void const* x,
Expand Down
6 changes: 6 additions & 0 deletions ep/include/proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Proxy {
bool pin_thread = true;
int num_experts = 0;
int num_ranks = 0;
int num_nodes = 0;
};

explicit Proxy(Config const& cfg) : cfg_(cfg) {
Expand Down Expand Up @@ -90,6 +91,11 @@ class Proxy {
void post_gpu_command(uint64_t& my_tail, size_t& seen);
void post_gpu_commands_mixed(std::vector<uint64_t> const& wrs_to_post,
std::vector<TransferCmd> const& cmds_to_post);
void post_barrier_msg(int dst_rank, bool ack, uint64_t seq);
void send_barrier(uint64_t wr);
void barrier_check();
void quiet(std::vector<uint64_t> wrs, std::vector<TransferCmd> cmds);
void quiet_cq();
Config cfg_;
RDMAConnectionInfo local_info_{}, remote_info_{};

Expand Down
25 changes: 25 additions & 0 deletions ep/include/proxy_ctx.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include "barrier_local.hpp"
#include "util/gpu_rt.h"
#include <infiniband/verbs.h>
#include <atomic>
Expand All @@ -24,6 +25,7 @@ class TokenCounter {

using DispatchTokenKey = std::tuple<int, int, int>;
using CombineTokenKey = std::pair<int, int>;
using NormalTokenKey = std::pair<int, int>;

struct WriteStruct {
int expert_idx;
Expand Down Expand Up @@ -85,9 +87,32 @@ struct ProxyCtx {

TokenCounter<DispatchTokenKey> dispatch_token_counter;
TokenCounter<CombineTokenKey> combine_token_counter;
TokenCounter<NormalTokenKey> normal_token_counter;

/* low_latency_buffer_idx, expert_idx, dst_rank */
std::unordered_map<uint64_t, WriteStruct> wr_id_to_write_struct;
TokenCounter<DispatchTokenKey> dispatch_sent_counter;
TokenCounter<DispatchTokenKey> combine_sent_counter;
TokenCounter<NormalTokenKey> normal_sent_counter;

// Async-barrier state (single inflight assumed)
bool barrier_inflight = false;
uint64_t barrier_seq = 0;
uint64_t barrier_wr = 0;

// Rank-0 bookkeeping
std::vector<uint8_t> barrier_arrived; // size = num_ranks; 1 if arrival seen
int barrier_arrival_count = 0; // arrivals seen (include self)

// Followers: release flag from rank-0
bool barrier_released = false;
uint64_t barrier_release_seq = 0;

// Intra-node (shared-memory) barrier state
LocalBarrier* lb = nullptr; // mapped shared barrier block (per node+thread)
bool lb_owner = false; // we created the shm (so we should unlink on destroy)
int num_local_ranks = 0; // #local ranks on this node
int node_leader_rank = -1; // lowest global rank on this node
int local_rank = -1; // convenience mirror of cfg_.local_rank
int thread_idx = -1; // thread index used in shm name
};
Loading
Loading