Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DOCA stage split: source + convert #1617

Merged
merged 48 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
716559c
Doca stage split: source + convert
eagonv Apr 11, 2024
9e7a163
Fix code style issues
eagonv Apr 11, 2024
d266ca1
1 thread only for DOCA Source Stage
eagonv Apr 11, 2024
63ea2ad
Progress in column creation
eagonv Apr 12, 2024
1573152
vdb_realtime works
eagonv Apr 13, 2024
537e5ba
VDB works + example directory
eagonv Apr 16, 2024
c456546
Add nemollm to pip dependencies in dev yaml
eagonv Apr 16, 2024
5af237f
Update readme
eagonv Apr 16, 2024
2da4b8c
Syntax fix
eagonv Apr 17, 2024
84cdffa
Merge branch 'branch-24.06' into doca-split
e-ago Apr 17, 2024
8b399e5
Minor update to documentation
eagonv Apr 17, 2024
0b1c263
Merge branch 'doca-split' of github.com:e-ago/MorpheusDoca into doca-…
eagonv Apr 17, 2024
e9e4295
Merge branch 'branch-24.06' into doca-split
e-ago Apr 17, 2024
deb2937
Fix vdb header, fix DOCA cleanup
eagonv Apr 18, 2024
9a2a071
Increase to 2 rx queues
eagonv Apr 18, 2024
c40f048
More fixes
eagonv Apr 19, 2024
b0ed2e5
DocaConvert stage allows dynamic number of packets + fix to python pi…
eagonv Apr 22, 2024
42676be
Fix code style
eagonv Apr 22, 2024
b9534ba
Minor fix
eagonv Apr 22, 2024
86977c2
Upgrade to DOCA 2.7 + minor fixes
eagonv May 9, 2024
000cf18
More improvements and codestyle fix
eagonv May 9, 2024
79dbca3
Doca stage split: source + convert
eagonv Apr 11, 2024
1884553
Fix code style issues
eagonv Apr 11, 2024
acff3b8
1 thread only for DOCA Source Stage
eagonv Apr 11, 2024
1ef468d
Progress in column creation
eagonv Apr 12, 2024
5c83c37
vdb_realtime works
eagonv Apr 13, 2024
a1e7e2d
VDB works + example directory
eagonv Apr 16, 2024
9a565be
Add nemollm to pip dependencies in dev yaml
eagonv Apr 16, 2024
438acb2
Update readme
eagonv Apr 16, 2024
ebbd17a
Syntax fix
eagonv Apr 17, 2024
061d0c0
Minor update to documentation
eagonv Apr 17, 2024
85bf6ee
Fix vdb header, fix DOCA cleanup
eagonv Apr 18, 2024
d4d8ec0
Increase to 2 rx queues
eagonv Apr 18, 2024
a9b63f8
More fixes
eagonv Apr 19, 2024
af24e93
DocaConvert stage allows dynamic number of packets + fix to python pi…
eagonv Apr 22, 2024
5639f28
Fix code style
eagonv Apr 22, 2024
2805669
Minor fix
eagonv Apr 22, 2024
3c0ccee
Upgrade to DOCA 2.7 + minor fixes
eagonv May 9, 2024
9960f44
More improvements and codestyle fix
eagonv May 9, 2024
d85d7e5
Merge branch 'doca-split' of github.com:e-ago/MorpheusDoca into doca-…
eagonv May 9, 2024
a59b53d
Merge remote-tracking branch 'upstream/branch-24.06' into doca-split
mdemoret-nv May 13, 2024
6109b83
Merging and removing commented code.
mdemoret-nv May 13, 2024
b96cdaf
Style cleanup
mdemoret-nv May 13, 2024
101991c
Passed linting.
mdemoret-nv May 14, 2024
427af1d
Resolving comments from feedback
mdemoret-nv May 14, 2024
eeea4a6
Merge pull request #2 from mdemoret-nv/mdd_fix-doca-split
e-ago May 14, 2024
b402693
Style cleanup
mdemoret-nv May 14, 2024
6aa8c70
Style cleanup
mdemoret-nv May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
DocaConvert stage allows dynamic number of packets + fix to python pi…
…peline
  • Loading branch information
eagonv committed Apr 22, 2024
commit b0ed2e5b59e0910e18c6bcde652cb676a9f85edc
10 changes: 4 additions & 6 deletions examples/doca/run_udp_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
from morpheus.stages.doca.doca_convert_stage import DocaConvertStage
from morpheus.stages.doca.doca_source_stage import DocaSourceStage
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage
# from morpheus.stages.general.trigger_stage import TriggerStage
from morpheus.utils.logger import configure_logging


@click.command()
@click.option(
"--out_file",
Expand All @@ -55,15 +53,15 @@ def run_pipeline(out_file, nic_addr, gpu_addr):
config.mode = PipelineModes.NLP

# Below properties are specified by the command line
config.num_threads = 5
config.num_threads = 7
config.edge_buffer_size = 512

pipeline = LinearPipeline(config)

# add doca source stage
pipeline.set_source(DocaSourceStage(config, nic_addr, gpu_addr, 'udp'))
# pipeline.add_stage(TriggerStage(config))
mdemoret-nv marked this conversation as resolved.
Show resolved Hide resolved
pipeline.add_stage(DocaConvertStage(config))
pipeline.add_stage(DeserializeStage(config))
pipeline.add_stage(PreprocessNLPStage(config))
pipeline.add_stage(MonitorStage(config, description="DOCA GPUNetIO rate", unit='pkts'))

# Build the pipeline here to see types in the vizualization
Expand Down
6 changes: 3 additions & 3 deletions examples/doca/run_udp_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def run_pipeline(out_file, nic_addr, gpu_addr):
config.mode = PipelineModes.NLP

# Below properties are specified by the command line
config.num_threads = 1
config.num_threads = 4
config.edge_buffer_size = 512

def count_raw_packets(message: RawPacketMessage):
return message.num
Expand All @@ -61,8 +62,7 @@ def count_raw_packets(message: RawPacketMessage):

# add doca source stage
pipeline.set_source(DocaSourceStage(config, nic_addr, gpu_addr, 'udp'))
pipeline.add_stage(
MonitorStage(config, description="DOCA GPUNetIO rate", unit='pkts', determine_count_fn=count_raw_packets))
pipeline.add_stage(MonitorStage(config, description="DOCA GPUNetIO rate", unit='pkts', determine_count_fn=count_raw_packets))

# Build the pipeline here to see types in the vizualization
pipeline.build()
Expand Down
9 changes: 4 additions & 5 deletions examples/doca/vdb_realtime/vdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,16 @@ def run_pipeline(out_file, nic_addr, gpu_addr):

config = Config()
config.mode = PipelineModes.NLP
config.pipeline_batch_size = 2048
config.pipeline_batch_size = 1024
config.feature_length = 512

# Below properties are specified by the command line
# config.num_threads = 5
config.edge_buffer_size = 512
config.num_threads = 15

pipeline = LinearPipeline(config)

# add doca source stage
pipeline.set_source(DocaSourceStage(config, nic_addr, gpu_addr, 'udp'))
pipeline.add_stage(DocaConvertStage(config, False))
pipeline.add_stage(DocaConvertStage(config))

pipeline.add_stage(MonitorStage(config, description="DOCA GPUNetIO Source rate", unit='pkts'))

Expand Down
8 changes: 0 additions & 8 deletions models/triton-model-repo/all-MiniLM-L6-v2/config.pbtxt
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@ optimization {
key: "precision_mode"
value: "FP16"
}
parameters {
key: "trt_engine_cache_enable"
value: "true"
}
parameters {
key: "trt_engine_cache_path"
value: "/models/triton-model-repo/.cache/triton"
}
}
}
}
3 changes: 2 additions & 1 deletion morpheus/_lib/doca/include/morpheus/doca/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ uint32_t const PACKETS_PER_BLOCK = PACKETS_PER_THREAD * THREADS_PER_BLOCK;
uint32_t const PACKET_RX_TIMEOUT_NS = 500000; // 1ms //500us

uint32_t const MAX_PKT_RECEIVE = PACKETS_PER_BLOCK;
uint32_t const MAX_PKT_SIZE = 4096;
uint32_t const MAX_PKT_CONVERT = MAX_PKT_RECEIVE * 5;
uint32_t const MAX_PKT_SIZE = 2048;
uint32_t const MAX_PKT_HDR = 64;
uint32_t const MAX_PKT_NUM = 65536;
uint32_t const MAX_QUEUE = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ std::unique_ptr<cudf::column> gather_payload(
uintptr_t* packets_buffer,
uint32_t* header_sizes,
uint32_t* payload_sizes,
uint32_t* fixed_size_list,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

Expand Down
2 changes: 2 additions & 0 deletions morpheus/_lib/doca/include/morpheus/doca/doca_stages.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class DocaConvertStage : public mrc::pymrc::PythonNode<std::shared_ptr<RawPacket

cudaStream_t m_stream;
rmm::cuda_stream_view m_stream_cpp;
uint32_t* fixed_size_list;
uint32_t* fixed_size_list_cpu;
};

/****** DocaConvertStageInterfaceProxy***********************/
Expand Down
6 changes: 3 additions & 3 deletions morpheus/_lib/doca/src/doca_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ doca_flow_port* init_doca_flow(uint16_t port_id, uint8_t rxq_num)
doca_flow_cfg rxq_flow_cfg = {0};
rte_eth_dev_info dev_info = {nullptr};
rte_eth_conf eth_conf = {
.rxmode =
.rxmode =
{
.mtu = 2048, /* Not really used, just to initialize DPDK */
.mtu = 1024, /* Not really used, just to initialize DPDK */
},
.txmode =
.txmode =
{
.offloads = RTE_ETH_TX_OFFLOAD_IPV4_CKSUM | RTE_ETH_TX_OFFLOAD_UDP_CKSUM | RTE_ETH_TX_OFFLOAD_TCP_CKSUM,
},
Expand Down
23 changes: 18 additions & 5 deletions morpheus/_lib/doca/src/doca_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,17 @@ DocaConvertStage::DocaConvertStage() :
{
cudaStreamCreateWithFlags(&m_stream, cudaStreamNonBlocking);
m_stream_cpp = rmm::cuda_stream_view(reinterpret_cast<cudaStream_t>(m_stream));
fixed_size_list_cpu = (uint32_t *) calloc(MAX_PKT_RECEIVE * 4, sizeof(uint32_t));
cudaMalloc((void **)&fixed_size_list, MAX_PKT_RECEIVE * 4 * sizeof(uint32_t));
for (int idx = 0; idx < MAX_PKT_RECEIVE * 4; idx++)
fixed_size_list_cpu[idx] = MAX_PKT_SIZE;
cudaMemcpy(fixed_size_list, fixed_size_list_cpu, MAX_PKT_RECEIVE * 4 * sizeof(uint32_t), cudaMemcpyDefault);
}

DocaConvertStage::~DocaConvertStage()
{
free(fixed_size_list_cpu);
cudaFree(fixed_size_list);
cudaStreamDestroy(m_stream);
}

Expand Down Expand Up @@ -123,16 +130,22 @@ DocaConvertStage::source_type_t DocaConvertStage::on_raw_packet_message(sink_typ

// LOG(WARNING) << "New RawPacketMessage with " << packet_count << " packets from queue id " << queue_idx;

// const auto t0 = now_ns();

// gather header data
auto header_src_ip_col = cudf::make_column_from_scalar(cudf::string_scalar("111.111.111.111"), packet_count);
auto header_src_ip_addr = header_src_ip_col->mutable_view().data<uint8_t>();
doca::gather_header_scalar(packet_count, pkt_addr_list, pkt_hdr_size_list, pkt_pld_size_list, header_src_ip_addr, m_stream_cpp);

// const auto t1 = now_ns();

// gather payload data
auto payload_col =
doca::gather_payload(packet_count, pkt_addr_list, pkt_hdr_size_list, pkt_pld_size_list, m_stream_cpp);
doca::gather_payload(packet_count, pkt_addr_list,
pkt_hdr_size_list, pkt_pld_size_list,
fixed_size_list, m_stream_cpp);

// const auto gather_payload_stop = now_ns();
// const auto t2 = now_ns();

std::vector<std::unique_ptr<cudf::column>> gathered_columns;
gathered_columns.emplace_back(std::move(header_src_ip_col));
Expand All @@ -141,7 +154,7 @@ DocaConvertStage::source_type_t DocaConvertStage::on_raw_packet_message(sink_typ
// After this point buffers can be reused -> copies actual packets' data
auto gathered_table = std::make_unique<cudf::table>(std::move(gathered_columns));

// const auto gather_table_meta = now_ns();
// const auto t3 = now_ns();

auto gathered_metadata = cudf::io::table_metadata();
gathered_metadata.schema_info.emplace_back("src_ip");
Expand All @@ -150,11 +163,11 @@ DocaConvertStage::source_type_t DocaConvertStage::on_raw_packet_message(sink_typ
auto gathered_table_w_metadata =
cudf::io::table_with_metadata{std::move(gathered_table), std::move(gathered_metadata)};

// const auto create_message_cpp = now_ns();
// const auto t4 = now_ns();

auto meta = MessageMeta::create_from_cpp(std::move(gathered_table_w_metadata), 0);

// const auto gather_meta_stop = now_ns();
// const auto t5 = now_ns();

cudaStreamSynchronize(m_stream_cpp);

Expand Down
22 changes: 19 additions & 3 deletions morpheus/_lib/doca/src/doca_convert_kernel.cu
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,23 @@ __global__ void _packet_gather_payload_kernel(
uintptr_t* packets_buffer,
uint32_t* header_sizes,
uint32_t* payload_sizes,
uint8_t* payload_chars_out
uint8_t* payload_chars_out
)
{
int pkt_idx = threadIdx.x;
int j = 0;

while (pkt_idx < packet_count) {
uint8_t* pkt_hdr_addr = (uint8_t*)(packets_buffer[pkt_idx] + header_sizes[pkt_idx]);
for (j = 0; j < payload_sizes[pkt_idx]; j++)
payload_chars_out[(MAX_PKT_SIZE * pkt_idx) + j] = pkt_hdr_addr[j];
for (; j < MAX_PKT_SIZE; j++)
payload_chars_out[(MAX_PKT_SIZE * pkt_idx) + j] = '\0';
pkt_idx += blockDim.x;
}

#if 0

// Specialize BlockScan for a 1D block of 128 threads of type int
using BlockScan = cub::BlockScan<int32_t, THREADS_PER_BLOCK>;
// Allocate shared memory for BlockScan
Expand Down Expand Up @@ -86,6 +100,7 @@ __global__ void _packet_gather_payload_kernel(
// packets_buffer[packet_idx]);
}
}
#endif
}

__global__ void _packet_gather_header_kernel(
Expand Down Expand Up @@ -115,12 +130,13 @@ std::unique_ptr<cudf::column> gather_payload(
uintptr_t* packets_buffer,
uint32_t* header_sizes,
uint32_t* payload_sizes,
uint32_t* fixed_size_list,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto [offsets_column, bytes] = cudf::detail::make_offsets_child_column(
payload_sizes,
payload_sizes + packet_count,
fixed_size_list,
fixed_size_list + packet_count,
stream,
mr
);
Expand Down
22 changes: 11 additions & 11 deletions morpheus/_lib/doca/src/doca_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ DocaSourceStage::subscriber_fn_t DocaSourceStage::build()
continue;
}

const auto start_kernel = now_ns();
// const auto start_kernel = now_ns();

// Assume MAX_QUEUE == 2
morpheus::doca::packet_receive_kernel(m_rxq[0]->rxq_info_gpu(), m_rxq[1]->rxq_info_gpu(),
Expand All @@ -158,11 +158,11 @@ DocaSourceStage::subscriber_fn_t DocaSourceStage::build()
rstream);
cudaStreamSynchronize(rstream);

const auto end_kernel = now_ns();
// const auto end_kernel = now_ns();

for (int queue_idx = 0; queue_idx < MAX_QUEUE; queue_idx++) {
if (m_semaphore[queue_idx]->is_ready(sem_idx[queue_idx])) {
const auto start_sem = now_ns();
// const auto start_sem = now_ns();
// LOG(WARNING) << "CPU READY sem " << sem_idx[queue_idx] << " queue " << thread_idx << std::endl;

pkt_ptr = static_cast<struct packets_info*>(m_semaphore[queue_idx]->get_info_cpu(sem_idx[queue_idx]));
Expand All @@ -182,21 +182,21 @@ DocaSourceStage::subscriber_fn_t DocaSourceStage::build()
true,
queue_idx);

const auto create_msg = now_ns();
// const auto create_msg = now_ns();

output.on_next(std::move(meta));

m_semaphore[queue_idx]->set_free(sem_idx[queue_idx]);
sem_idx[queue_idx] = (sem_idx[queue_idx] + 1) % MAX_SEM_X_QUEUE;

const auto end = now_ns();
// const auto end = now_ns();

LOG(WARNING) << "Queue " << queue_idx
<< " packets " << pkt_ptr->packet_count_out
<< " kernel time ns " << end_kernel - start_kernel
<< " Sem + msg ns " << create_msg - start_sem
<< " End ns " << end - create_msg
<< std::endl;
// LOG(WARNING) << "Queue " << queue_idx
// << " packets " << pkt_ptr->packet_count_out
// << " kernel time ns " << end_kernel - start_kernel
// << " Sem + msg ns " << create_msg - start_sem
// << " End ns " << end - create_msg
// << std::endl;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion morpheus/stages/doca/doca_convert_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, c: Config):
"Ensure the DOCA components have been built and installed. Error message: ") +
ex.msg) from ex

self._max_concurrent = c.num_threads
self._max_concurrent = 3

@property
def name(self) -> str:
Expand Down
4 changes: 2 additions & 2 deletions morpheus/stages/doca/doca_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ def __init__(

self._batch_size = c.pipeline_batch_size
self._input_count = None
self._max_concurrent = 1 # Only 1 thread is enough for 2 queues
self._nic_pci_address = nic_pci_address
self._gpu_pci_address = gpu_pci_address
self._traffic_type = traffic_type.lower()
Expand Down Expand Up @@ -96,7 +95,8 @@ def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject:
self._nic_pci_address,
self._gpu_pci_address,
self._traffic_type)
# node.launch_options.pe_count = self._max_concurrent
# Only 1 thread is enough for 2 queues
node.launch_options.pe_count = 1
return node

raise NotImplementedError("Does not support Python nodes")