Skip to content

Commit

Permalink
Merge branch 'tuvok-separate-parsing-library'
Browse files Browse the repository at this point in the history
closes #14
  • Loading branch information
mguentner committed Aug 21, 2017
2 parents 775130f + c919e9f commit e3ac739
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 102 deletions.
7 changes: 6 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,17 @@ add_library(addsources STATIC
timer.cpp
udpthread.cpp
canthread.cpp)

add_library(cannelloni-common SHARED
parser.cpp)

if(SCTP_SUPPORT)
add_library(sctpthread STATIC sctpthread.cpp)
target_link_libraries(sctpthread addsources sctp)
target_link_libraries(addsources sctpthread)
endif(SCTP_SUPPORT)
set_target_properties(addsources PROPERTIES LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
target_link_libraries(cannelloni addsources pthread)
target_link_libraries(cannelloni addsources cannelloni-common pthread)

install(TARGETS cannelloni DESTINATION bin)
install(TARGETS cannelloni-common DESTINATION lib)
118 changes: 118 additions & 0 deletions parser.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#include "parser.h"

#include <arpa/inet.h>
#include <string.h>

void parseFrames(uint16_t len, const uint8_t* buffer, std::function<canfd_frame*()> frameAllocator,
std::function<void(canfd_frame*, bool)> frameReceiver)
{
using namespace cannelloni;

const struct CannelloniDataPacket* data;
/* Check for OP Code */
data = reinterpret_cast<const struct CannelloniDataPacket*> (buffer);
if (data->version != CANNELLONI_FRAME_VERSION)
throw std::runtime_error("Received wrong version");

if (data->op_code != DATA)
throw std::runtime_error("Received wrong OP code");

if (ntohs(data->count) == 0)
return; // Empty packets silently ignored

const uint8_t* rawData = buffer + CANNELLONI_DATA_PACKET_BASE_SIZE;

for (uint16_t i = 0; i < ntohs(data->count); i++)
{
if (rawData - buffer + CANNELLONI_FRAME_BASE_SIZE > len)
throw std::runtime_error("Received incomplete packet");

/* We got at least a complete canfd_frame header */
canfd_frame* frame = frameAllocator();
if (!frame)
throw std::runtime_error("Allocation error.");

canid_t tmp;
memcpy(&tmp, rawData, sizeof (canid_t));
frame->can_id = ntohl(tmp);
/* += 4 */
rawData += sizeof (canid_t);
frame->len = *rawData;
/* += 1 */
rawData += sizeof (frame->len);
/* If this is a CAN FD frame, also retrieve the flags */
if (frame->len & CANFD_FRAME)
{
frame->flags = *rawData;
/* += 1 */
rawData += sizeof (frame->flags);
}
/* RTR Frames have no data section although they have a dlc */
if ((frame->can_id & CAN_RTR_FLAG) == 0)
{
/* Check again now that we know the dlc */
if (rawData - buffer + canfd_len(frame) > len)
{
frame->len = 0;
frameReceiver(frame, false);

throw std::runtime_error("Received incomplete packet / can header corrupt!");
}

memcpy(frame->data, rawData, canfd_len(frame));
rawData += canfd_len(frame);
}

frameReceiver(frame, true);
}
}

uint8_t* buildPacket(uint16_t len, uint8_t* packetBuffer,
std::list<canfd_frame*>& frames, uint8_t seqNo,
std::function<void(std::list<canfd_frame*>&, std::list<canfd_frame*>::iterator)> handleOverflow)
{
using namespace cannelloni;

uint16_t frameCount = 0;
uint8_t* data = packetBuffer + CANNELLONI_DATA_PACKET_BASE_SIZE;
for (auto it = frames.begin(); it != frames.end(); it++)
{
canfd_frame* frame = *it;
/* Check for packet overflow */
if ((data - packetBuffer + CANNELLONI_FRAME_BASE_SIZE + canfd_len(frame)
+ ((frame->len & CANFD_FRAME) ? sizeof(frame->flags) : 0))
> len)
{
handleOverflow(frames, it);
break;
}
canid_t tmp = htonl(frame->can_id);
memcpy(data, &tmp, sizeof(canid_t));
/* += 4 */
data += sizeof(canid_t);
*data = frame->len;
/* += 1 */
data += sizeof(frame->len);
/* If this is a CAN FD frame, also send the flags */
if (frame->len & CANFD_FRAME)
{
*data = frame->flags;
/* += 1 */
data += sizeof(frame->flags);
}
if ((frame->can_id & CAN_RTR_FLAG) == 0)
{
memcpy(data, frame->data, canfd_len(frame));
data += canfd_len(frame);
}
frameCount++;
}
struct CannelloniDataPacket* dataPacket;
dataPacket = (struct CannelloniDataPacket*) (packetBuffer);
dataPacket->version = CANNELLONI_FRAME_VERSION;
dataPacket->op_code = DATA;
dataPacket->seq_no = seqNo;
dataPacket->count = htons(frameCount);

return data;
}
48 changes: 48 additions & 0 deletions parser.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#ifndef PARSER_H_
#define PARSER_H_

#include "cannelloni.h"

#include <linux/can.h>

#include <functional>
#include <list>

/**
* Parses Cannelloni packet and extracts CAN frames
* If frameAllocator allocates heap memory or reserves resources in some preallocated buffer
* you need to remember to free those resources up in the frameReceiver implementation.
* parseFrames function does not claim ownership of resources allocated by frameAllocator.
* In the case when incomplete packet is received frameReceiver will be passed a frame
* with len set to 0, so that proper deallocation of resources can be done there.
*
* @param len Buffer length
* @param buffer Pointer to buffer containing Cannelloni packet
* @param frameAllocator Callback responsible for providing memory for CAN frame. Returns pointer
* to allocated frame.
* @param frameReceiver Callback responsible for handling newly read CAN frame. First argument
* is pointer to frame allocated by frameAllocator and filled with data by parseFrames.
* Second argument tells whether frame was read with success and is correct (can be processed
* further), or whether there was some error and it should not be processed (apart from
* being deallocated)
*/
void parseFrames(uint16_t len, const uint8_t* buffer,
std::function<canfd_frame*()> frameAllocator,
std::function<void(canfd_frame*, bool)> frameReceiver);

/**
* Builds Cannelloni packet from provided list of CAN frames
* @param len Buffer length
* @param packetBuffer Pointer to buffer that will contain Cannelloni packet
* @param frames Reference to list of pointers to CAN frames
* @param seqNo Packet sequence number
* @param handleOverflow Callback responsible for handling CAN frames that did't fit
* into Cannelloni package. First argument is a frames list reference, second argument
* is iterator to the first not handled frame.
* @return
*/
uint8_t* buildPacket(uint16_t len, uint8_t* packetBuffer,
std::list<canfd_frame*>& frames, uint8_t seqNo,
std::function<void(std::list<canfd_frame*>&, std::list<canfd_frame*>::iterator)> handleOverflow);

#endif /* PARSER_H_ */
145 changes: 44 additions & 101 deletions udpthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "udpthread.h"
#include "logging.h"
#include "make_unique.h"
#include "parser.h"

UDPThread::UDPThread(const struct debugOptions_t &debugOptions,
const struct sockaddr_in &remoteAddr,
Expand Down Expand Up @@ -79,8 +80,9 @@ void UDPThread::stop() {
m_blockTimer.fire();
}



bool UDPThread::parsePacket(uint8_t *buffer, uint16_t len, struct sockaddr_in &clientAddr) {
bool error = false;
char clientAddrStr[INET_ADDRSTRLEN];

if (inet_ntop(AF_INET, &clientAddr.sin_addr, clientAddrStr, INET_ADDRSTRLEN) == NULL) {
Expand All @@ -91,75 +93,43 @@ bool UDPThread::parsePacket(uint8_t *buffer, uint16_t len, struct sockaddr_in &c
lwarn << "Received a packet from " << clientAddrStr
<< ", which is not set as a remote." << std::endl;
} else {
struct CannelloniDataPacket *data;
/* Check for OP Code */
data = (struct CannelloniDataPacket*) buffer;
if (data->version != CANNELLONI_FRAME_VERSION) {
lwarn << "Received wrong version" << std::endl;
error = true;
}
if (data->op_code != DATA) {
lwarn << "Received wrong OP code" << std::endl;
error = true;
}
if (ntohs(data->count) == 0) {
linfo << "Received empty packet" << std::endl;
error = true;
}
if (!error) {
uint8_t *rawData = buffer+CANNELLONI_DATA_PACKET_BASE_SIZE;

if (m_debugOptions.udp) {
linfo << "Received " << std::dec << len << " Bytes from Host " << clientAddrStr
<< ":" << ntohs(clientAddr.sin_port) << std::endl;
linfo << "Received " << std::dec << len << " Bytes from Host " << clientAddrStr
<< ":" << ntohs(clientAddr.sin_port) << std::endl;
}
m_rxCount++;
for (uint16_t i=0; i<ntohs(data->count); i++) {
if (rawData-buffer+CANNELLONI_FRAME_BASE_SIZE > len) {
lerror << "Received incomplete packet" << std::endl;
error = true;
break;
}
/* We got at least a complete canfd_frame header */
canfd_frame *frame = m_peerThread->getFrameBuffer()->requestFrame(true, m_debugOptions.buffer);
if (!frame) {
lerror << "Allocation error." << std::endl;
error = true;
break;
}
canid_t tmp;
memcpy(&tmp, rawData, sizeof(canid_t));
frame->can_id = ntohl(tmp);
/* += 4 */
rawData += sizeof(canid_t);
frame->len = *rawData;
/* += 1 */
rawData += sizeof(frame->len);
/* If this is a CAN FD frame, also retrieve the flags */
if (frame->len & CANFD_FRAME) {
frame->flags = *rawData;
/* += 1 */
rawData += sizeof(frame->flags);
}
/* RTR Frames have no data section although they have a dlc */
if ((frame->can_id & CAN_RTR_FLAG) == 0) {
/* Check again now that we know the dlc */
if (rawData-buffer+canfd_len(frame) > len) {
lerror << "Received incomplete packet / can header corrupt!" << std::endl;
error = true;
break;

auto allocator = [this]()
{
return m_peerThread->getFrameBuffer()->requestFrame(true, m_debugOptions.buffer);
};
auto receiver = [this](canfd_frame* f, bool success)
{
if (!success)
{
m_peerThread->getFrameBuffer()->insertFramePool(f);
return;
}
memcpy(frame->data, rawData, canfd_len(frame));
rawData += canfd_len(frame);
}
m_peerThread->transmitFrame(frame);
if (m_debugOptions.can) {
printCANInfo(frame);
}

m_peerThread->transmitFrame(f);
if (m_debugOptions.can)
{
printCANInfo(f);
}
};
try
{
parseFrames(len, buffer, allocator, receiver);
m_rxCount++;
}
catch(std::exception& e)
{
lerror << e.what();
return true;
}
}
}
}
return !error;
return false;
}

void UDPThread::run() {
Expand Down Expand Up @@ -282,10 +252,10 @@ void UDPThread::prepareBuffer() {
// compile time.
auto bufWrap = std::make_unique<uint8_t[]>(m_payloadSize);
auto packetBuffer = bufWrap.get();
uint8_t *data;

ssize_t transmittedBytes = 0;
uint16_t frameCount = 0;
struct CannelloniDataPacket *dataPacket;


struct timeval currentTime;

m_frameBuffer->swapBuffers();
Expand All @@ -294,42 +264,15 @@ void UDPThread::prepareBuffer() {

std::list<canfd_frame*> *buffer = m_frameBuffer->getIntermediateBuffer();

data = packetBuffer+CANNELLONI_DATA_PACKET_BASE_SIZE;
for (auto it = buffer->begin(); it != buffer->end(); it++) {
canfd_frame *frame = *it;
/* Check for packet overflow */
if ((data-packetBuffer
+CANNELLONI_FRAME_BASE_SIZE
+canfd_len(frame)
+((frame->len & CANFD_FRAME)?sizeof(frame->flags):0)) > m_payloadSize) {
auto overflowHandler = [this](std::list<canfd_frame*>&, std::list<canfd_frame*>::iterator it)
{
/* Move all remaining frames back to m_buffer */
m_frameBuffer->returnIntermediateBuffer(it);
break;
}
canid_t tmp = htonl(frame->can_id);
memcpy(data, &tmp, sizeof(canid_t));
/* += 4 */
data += sizeof(canid_t);
*data = frame->len;
/* += 1 */
data += sizeof(frame->len);
/* If this is a CAN FD frame, also send the flags */
if (frame->len & CANFD_FRAME) {
*data = frame->flags;
/* += 1 */
data += sizeof(frame->flags);
}
if ((frame->can_id & CAN_RTR_FLAG) == 0) {
memcpy(data, frame->data, canfd_len(frame));
data+=canfd_len(frame);
}
frameCount++;
}
dataPacket = (struct CannelloniDataPacket*) packetBuffer;
dataPacket->version = CANNELLONI_FRAME_VERSION;
dataPacket->op_code = DATA;
dataPacket->seq_no = m_sequenceNumber++;
dataPacket->count = htons(frameCount);
};

uint8_t* data = buildPacket(m_payloadSize, packetBuffer, *buffer,
m_sequenceNumber++, overflowHandler);

transmittedBytes = sendBuffer(packetBuffer, data-packetBuffer);
if (transmittedBytes != data-packetBuffer) {
lerror << "UDP Socket error. Error while transmitting" << std::endl;
Expand Down

0 comments on commit e3ac739

Please sign in to comment.