Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion init/config2args.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,20 @@ def process_input_ndp_plugin(settings):
if device is None:
raise ValueError("device must be specified in the ndp plugin configuration.")

process_plugins = settings.get("device", [])
if not isinstance(process_plugins, list):
raise ValueError("Invalid process plugins configuration format.")

res = ','.join(process_plugins)

queues = settings.get("queues")
if queues is None:
raise ValueError("queues must be specified in the ndp plugin configuration.")

# Parse the queues
parsed_queues = parse_ndp_queues(queues)

params = [f'-i "ndp;dev={device}:{queue_id}"' for queue_id in parsed_queues]
params = [f'-i "ndp;dev={res}:{queue_id}"' for queue_id in parsed_queues]
return " ".join(params)

def process_input_pcap_file_plugin(settings):
Expand Down
7 changes: 5 additions & 2 deletions init/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,11 @@
"ndp": {
"type": "object",
"properties": {
"device": {
"type": "string"
"device": {
"type": "array",
"items": {
"type": "string"
}
},
"queues": {
"type": "string"
Expand Down
101 changes: 83 additions & 18 deletions src/plugins/input/nfb/src/ndp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

#include "parser.hpp"

#include <algorithm>
#include <cstdio>
#include <cstring>
#include <iostream>
#include <memory>
#include <span>

#include <ipfixprobe/pluginFactory/pluginManifest.hpp>
#include <ipfixprobe/pluginFactory/pluginRegistrar.hpp>
Expand All @@ -43,7 +46,33 @@ static const PluginManifest ndpPluginManifest = {
},
};

static std::vector<std::string> parseDevices(const std::string& input)
{
std::vector<std::string> result;

size_t colon_pos = input.find_last_of(':');
std::string suffix;
std::string devices;

if (colon_pos != std::string::npos) {
devices = input.substr(0, colon_pos);
suffix = input.substr(colon_pos);
} else {
devices = input;
suffix = "";
}

std::stringstream ss(devices);
std::string dev;
while (std::getline(ss, dev, ',')) {
result.push_back(dev + suffix);
}

return result;
}

NdpPacketReader::NdpPacketReader(const std::string& params)
: ndp_packet_burst(new ndp_packet[64])
{
init(params.c_str());
}
Expand All @@ -65,18 +94,29 @@ void NdpPacketReader::init(const char* params)
if (parser.m_dev.empty()) {
throw PluginError("specify device path");
}

init_ifc(parser.m_dev);
}

void NdpPacketReader::close()
{
ndpReader.close();
for (size_t i = 0; i < m_readers_count; i++) {
ndpReader[i].close();
}
}

void NdpPacketReader::init_ifc(const std::string& dev)
{
if (ndpReader.init_interface(dev) != 0) {
throw PluginError(ndpReader.error_msg);
const std::vector<std::string> devs = parseDevices(dev);
m_readers_count = devs.size();
if (m_readers_count > 2) {
throw PluginError("too many devices specified");
}

for (size_t i = 0; i < m_readers_count; i++) {
if (ndpReader[i].init_interface(devs[i]) != 0) {
throw PluginError(ndpReader[i].error_msg);
}
}
}

Expand All @@ -85,38 +125,63 @@ InputPlugin::Result NdpPacketReader::get(PacketBlock& packets)
parser_opt_t opt = {&packets, false, false, 0};
struct ndp_packet* ndp_packet;
struct timeval timestamp;
size_t read_pkts = 0;
int ret = -1;

packets.cnt = 0;
for (unsigned i = 0; i < packets.size; i++) {
ret = ndpReader.get_pkt(&ndp_packet, &timestamp);
if (ret == 0) {
if (opt.pblock->cnt) {
break;
}
return Result::TIMEOUT;
} else if (ret < 0) {
// Error occured.
throw PluginError(ndpReader.error_msg);
constexpr size_t maxBurstSize = 64;
size_t burstSize = std::min(packets.size, maxBurstSize);
std::span<struct ndp_packet> packetSpan(ndp_packet_burst.get(), burstSize);
std::span<timeval> timestampSpan(timestamps);

size_t reader_index = (m_reader_idx++) & (m_readers_count - 1);
NdpReader& reader = ndpReader[reader_index];
int received = reader.get_packets(packetSpan, timestampSpan);

if (received < 32) {
std::span<struct ndp_packet> packetSpan(
ndp_packet_burst.get() + received,
burstSize - received);
std::span<timeval> timestampSpan(timestamps.data() + received, burstSize - received);

size_t reader_index = (m_reader_idx++) & (m_readers_count - 1);
NdpReader& reader = ndpReader[reader_index];
received += reader.get_packets(packetSpan, timestampSpan);
}

for (unsigned i = 0; i < received; ++i) {
ndp_packet = &ndp_packet_burst[i];
timestamp = timestamps[i];

if (ndp_packet->data_length == 0) {
continue; // Skip empty packets
}
read_pkts++;

parse_packet(
&opt,
m_parser_stats,
timestamp,
ndp_packet->data,
ndp_packet->data_length,
ndp_packet->data_length);

if (opt.pblock->cnt >= packets.size) {
break;
}
}

m_seen += read_pkts;
m_seen += received;
m_parsed += opt.pblock->cnt;

m_stats.receivedPackets += read_pkts;
m_stats.receivedPackets += received;
m_stats.receivedBytes += packets.bytes;

return opt.pblock->cnt ? Result::PARSED : Result::NOT_PARSED;
if (opt.pblock->cnt) {
return Result::PARSED;
} else if (received == 0) {
return Result::TIMEOUT;
} else {
return Result::NOT_PARSED;
}
}

void NdpPacketReader::configure_telemetry_dirs(
Expand Down
10 changes: 9 additions & 1 deletion src/plugins/input/nfb/src/ndp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

#include "ndpReader.hpp"

#include <memory>
#include <span>

#include <ipfixprobe/inputPlugin.hpp>
#include <ipfixprobe/options.hpp>
#include <ipfixprobe/packet.hpp>
Expand Down Expand Up @@ -81,9 +84,14 @@ class NdpPacketReader : public InputPlugin {

telemetry::Content get_queue_telemetry();

NdpReader ndpReader;
NdpReader ndpReader[2];
std::size_t m_readers_count;
uint64_t m_reader_idx = 0;
RxStats m_stats = {};

std::unique_ptr<struct ndp_packet[]> ndp_packet_burst;
std::array<timeval, 64> timestamps;

void init_ifc(const std::string& dev);
};

Expand Down
40 changes: 40 additions & 0 deletions src/plugins/input/nfb/src/ndpReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,46 @@ void NdpReader::set_sw_timestamp(struct timeval* tv)
tv->tv_usec = micros;
}

int NdpReader::get_packets(std::span<struct ndp_packet> packets, std::span<timeval> timestamps)
{
if (blocked_packets > 128) {
ndp_rx_burst_put(rx_handle);
blocked_packets = 0;
}

const unsigned received = ndp_rx_burst_get(rx_handle, packets.data(), packets.size());
for (unsigned i = 0; i < received; i++) {
struct ndp_packet* ndp_packet = &packets[i];
if (fw_type == NdpFwType::NDP_FW_HANIC) {
uint64_t* fw_ts = &((NdpHeader*) (ndp_packet->header))->timestamp;
if (*fw_ts == 0) {
set_sw_timestamp((struct timeval*) &timestamps[i]);
} else {
convert_fw_ts_to_timeval(fw_ts, (struct timeval*) &timestamps[i]);
}
} else {
uint8_t header_id = ndp_packet_flag_header_id_get(ndp_packet);
if (header_id >= ndk_timestamp_offsets.size()) {
set_sw_timestamp((struct timeval*) &timestamps[i]);
} else if (ndk_timestamp_offsets[header_id] == std::numeric_limits<uint32_t>::max()) {
set_sw_timestamp((struct timeval*) &timestamps[i]);
} else {
uint64_t* fw_ts = (uint64_t*) ((uint8_t*) ndp_packet->header
+ ndk_timestamp_offsets[header_id]);
if (*fw_ts == std::numeric_limits<uint64_t>::max()) {
set_sw_timestamp((struct timeval*) &timestamps[i]);
} else {
convert_fw_ts_to_timeval(fw_ts, (struct timeval*) &timestamps[i]);
}
}
}
}

blocked_packets += received;

return received;
}

int NdpReader::get_pkt(struct ndp_packet** ndp_packet_out, struct timeval* timestamp)
{
if (ndp_packet_buffer_processed >= ndp_packet_buffer_packets) {
Expand Down
5 changes: 5 additions & 0 deletions src/plugins/input/nfb/src/ndpReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "ndpHeader.hpp"

#include <span>
#include <string>
#include <vector>

Expand Down Expand Up @@ -49,6 +50,8 @@ class NdpReader {
int get_pkt(struct ndp_packet** ndp_packet, struct timeval* timestamp);
std::string error_msg;

int get_packets(std::span<struct ndp_packet> packets, std::span<timeval> timestamps);

private:
void set_booted_fw();
void convert_fw_ts_to_timeval(const uint64_t* fw_ts, struct timeval* tv);
Expand All @@ -60,6 +63,8 @@ class NdpReader {
uint16_t packet_bufferSize;
uint64_t timeout;

uint64_t blocked_packets = 0;

NdpFwType fw_type;
std::vector<uint32_t> ndk_timestamp_offsets;

Expand Down